LambdaでCloudFrontアクセスログを展開しログファイルの形式を変換してみた
CloudFrontアクセスログはS3に出力することができますが、ログはgzipで圧縮され、TSV形式になっています。
本エントリでは、アクセスログの出力をトリガーに、Lambda関数を起動させ、gzipを展開、TSV形式からCSV形式に変換し、S3にアップロードしてみたいと思います。
構成&前提
構成は以下となります。
アクセスログの出力先バケットは単一で、CloudFrontディストリビューション毎にログ出力先のプレフィックスを指定します。Lambda関数を起動するS3トリガーではプレフィックスを指定し、CSV形式に変換後に出力するバケットは、アクセスログ出力先バケットと同一で、フォルダ *1が異なる構成です。本エントリでは、CloudFront等は既に構築されている前提(アクセスログ出力先のフォルダも作成済み)で、Lambda関数の作成を中心にご紹介したいと思います。
テスト用にCloudFront環境がさくっとほしい方に向け、こちらにCFnテンプレートのサンプルを置いておきました。
Lambda関数作成
ログ形式を変換するLambda関数をマネジメントコンソールより作成します。Lambda関数のランタイムはPython 3.7
を利用します。
任意の関数名を入力し、Lambda関数にロールを指定します。ロールにアタッチするポリシーは、AWS管理ポリシーAWSLambdaExecute
と、AmazonS3FullAccess
を指定します。(ロールに付与する権限(ポリシー)は、要件に応じ見直してください。)
S3イベントでLambda関数を起動するため、「トリガーを追加」をクリックします。
トリガーにて「S3」を選択します。
トリガーの設定では以下を指定し、トリガーを追加します。
- バケット:CloudFrontアクセスログが出力されるバケットを指定
- イベントタイプ:すべてのオブジェクト作成イベント
- プレフィックス:ログ格納フォルダを指定(ここでは、original/neko.tk)
- サフィックス:.gz
トリガーが追加されると、S3が表示されます。
該当のバケットからも確認することができます。
Lambda関数に処理させるコードを作成します。
ここでは、以下のコードを貼り付けます。
import os import boto3 import logging import gzip import codecs import re s3 = boto3.resource('s3') s3_client = boto3.client('s3') logger = logging.getLogger() logger.setLevel(logging.ERROR) def lambda_handler(event, context): logger.info('## event') logger.info(event) output_folder = os.environ['S3_OUTPUT_FOLDER'] # アップロード先フォルダ名(Lambda環境変数で指定) source_bucket = event['Records'][0]['s3']['bucket']['name'] # Lambda関数呼び出し元バケット名 source_key = event['Records'][0]['s3']['object']['key'] # オブジェクトキー取得 result = source_key.split('/') # オブジェクトキーから、フォルダ名とファイル名を分離 upper_folder_name = result[0] # 上位フォルダ名 lower_folder_name = result[1] # 下位フォルダ名 cf_log_name_gz = result[2] # CloudFrontアクセスログ名 cf_log_name_csv = cf_log_name_gz.replace('.gz','.csv') # 展開後のファイル名 # カレントディレクトリ移動 os.chdir('/tmp') # CloudFrontログダウンロード source_bucket_obj = s3.Bucket(source_bucket) source_bucket_obj.download_file(source_key, cf_log_name_gz) # gzip展開してtsvからcsvに変換 with gzip.open(cf_log_name_gz,mode='rb') as f: reader = codecs.getreader("utf-8") contents = reader(f) with open(cf_log_name_csv,mode='w',encoding='utf-8',newline='\n') as newf: # タブを「,」に置換 sub_text = re.sub('\t', ',', contents.read()) newf.write(sub_text) #パス結合 s3_object_key = os.path.join(output_folder, lower_folder_name) ful_s3_object_key = os.path.join(s3_object_key, cf_log_name_csv) # 宛先バケットでオブジェクト(フォルダ)をリストして、フォルダの存在チェック res_list_objects = s3_client.list_objects_v2( Bucket = source_bucket, Prefix = s3_object_key ) # アップロード先バケットのフォルダ有無確認 if res_list_objects.get('Contents'): # csvファイルアップロード s3_client.upload_file(cf_log_name_csv,source_bucket ,ful_s3_object_key) else: # アップロード先バケットにフォルダ作成 source_bucket_obj.put_object(Key=s3_object_key + '/') # csvファイルアップロード s3_client.upload_file(cf_log_name_csv,source_bucket ,ful_s3_object_key)
環境変数を指定します。ここでは、アップロード先のフォルダを環境変数で定義しています。キーにS3_OUTPUT_FOLDER
、値にungzip
を指定します。
その他の項目についてはデフォルトのままで「保存」します。Lambda関数の設定は以上です。
確認
動作を確認したいと思います。事前に変換後のアクセスログが出力されるフォルダを確認していました。Lambda関数起動前に確認していたのでフォルダ内にオブジェクトはありません。
アクセスログが出力されるよう、CloudFrontにアクセスしてみました。
少しすると、アクセスログが出力されます。
アクセスログ出力をトリガーにLambda関数が起動し、CloudFrontログ出力先で指定したプレフィックと同一名称のフォルダが作成されています。
フォルダ配下を確認すると、変換後のアクセスログがアップロードされています。
変換前後のアクセスログを比較し、TSV形式からCSV形式に変換されていることを確認しました。
CloudWatch Logsでは、該当のLambda関数が出力したログを確認することができます。
なお、変換対象を追加したい時は、Lambda関数にてS3トリガーの設定を追加してください。
最後に
Lambda関数を利用し、CloudFrontアクセスログを展開/変換する手順をご紹介しました。3rdパーティ製品にアクセスログを取り込みたい場合等、gzip形式では都合がわるい時に、本エントリがお役に立てれば幸いです。